Dataset Usage¶
The Dataset Object¶
Dataset
is the primary class of the package. It is a general purpose “container” which stores your function, runs and their results/errors.
For the purposes of this tutorial, we will bring back our basic multiply
function you may have seen in the quickstart guide. Though we will amend it such that the delay time is adjustable:
[2]:
def multiply(a, b, t=1):
import time
time.sleep(t)
return a * b
We can now set up a Dataset
. The only required argument is the function, though there are many other optional arguments, most of which we shall also cover in this tutorial. See the Dataset API documentation for full details.
Again for this tutorial we will be using a local url, this enables the functions to run anywhere and be tested.
Note
At a basic level , the URL
is a connection to your machine, and can be swapped out at any time to change machines. In theory any function which runs on URL('machine.a')
will also run just the same on URL('machine.b')
.
[3]:
import time
from remotemanager import Dataset, URL
url = URL('localhost')
ds = Dataset(function=multiply,
url=url,
script='#!/bin/bash',
submitter='bash',
local_dir='temp_ds_staging',
remote_dir='temp_ds_remote',
name='tutorial_dataset',
skip=False)
The arguments shown here are likely to be the ones used the most. So in short:
url
: The remote connection, if it is not given, a defaultlocalhost
“connection” will be created for you.local_dir
: This is the directory that will be used to “stage” your files before sending to the remote. Defaults totemp_runner_local
.remote_dir
: Remote directory where files will be sent to. Defaults totemp_runner_remote
.name
: Datasets can be named, which makes their files easier to locate. By default, and files will simply use the uuid of the dataset/runner to differentiate.skip
: Contextual argument, if set toFalse
, will disable the Dataset init “skip”, forcing it to delete the existing database and start anew.
Extra Variables¶
If you wish to run on a machine which has a scheduler system, you can use the script
variable to pass your jobscript. Though there are more advanced features in place to generate dynamic jobscripts, see the Scheduler Tutorial for more info.
You can also specify a run_dir
, which will be an internal directory within remote_dir
. By default this is not specified and runs will run within the remote_dir
.
dbfile
allows you to force the dataset to store its database within a specific filename, should you wish to keep track of this. Otherwise, it defaults to {self.name}-{self.short_uuid}.yaml
.
Appending Runs¶
Before running your function you must append runs containing any arguments.
Dataset.append_run()
allows for run creation, and at minimum requires a dict
containing the required arguments for your function.
So in our case, a dictionary containing arguments for a
and b
are necessary for a run to begin. As t
has a default value of 1, it is optional. The structure below will append 3 runs displaying this behaviour:
Note
This is also true for runs that take no arguments, simply call append_run()
[4]:
runs = [{'a': 10, 'b': 5},
{'a': 5.7, 'b': 8.4},
{'a': 4, 'b': 4, 't': 6}]
for run in runs:
ds.append_run(args=run)
appended run runner-0
appended run runner-1
appended run runner-2
Note
There is also the alias arguments
for args
Additonally, if you wish to run scripts within unique folders, you can specify a run_dir
when appending runs. If this attribute is present, this folder will be created within the remote dir and the function will be run from within. You may need to adjust your scripts and additional files to suit this run behaviour.
The Runner object¶
Now we have a dataset which is able to be run and return our results. Before we do this, it is worth stepping through some useful debugging tools.
Firstly, how to query what runs you already have. This can be done by accessing the property Dataset.runners
:
[5]:
ds.runners
[5]:
[tutorial_dataset-a2c088ba-runner-0,
tutorial_dataset-a2c088ba-runner-1,
tutorial_dataset-a2c088ba-runner-2]
There is also the runner_dict
property, which returns the same information in dict(append id: runner) format
[6]:
ds.runner_dict
[6]:
{'runner-0': tutorial_dataset-a2c088ba-runner-0,
'runner-1': tutorial_dataset-a2c088ba-runner-1,
'runner-2': tutorial_dataset-a2c088ba-runner-2}
Lazy Append¶
Added in version 0.8.4.
If you have a lot of runners to append (especially ones with large arguments), the base append_run
can begin to slow down drastically. For such situations, you can call a context manager to wrap your run appends.
Here we copy the dataset (so as not to add too much bloat to the tutorial), then add 10 more runs:
[7]:
import copy
example_ds = copy.deepcopy(ds)
with example_ds.lazy_append() as la:
for i in range(10):
la.append_run({'a': i, 'b': 0})
print(len(example_ds.runners))
del example_ds
Of 13 appends: 13 appended
See get_append_log for more info
13
There is also a lazy
option which can be used, which does the same thing. However there is a requirement that once you are done appending runs, you must add a finish_append()
call, which finalises the appends all at once as though they were called normally.
Warning
Omitting the finish_append()
after using a lazy append will not raise an error, but can cause strange behaviour.
Running the Dataset¶
Running of the datasets is done via the Dataset.run()
method. This gives you one final opportunity to override any run arguments, as it provides another run_args
catch for extra keyword args.
Note
Be aware of the argument expansion limitation that exists with rsync versions below 3.0.0
. If you get errors during transfer, be sure to check rsync --version
>= 3.
[8]:
ds.run()
Staging Dataset... Staged 3/3 Runners
Transferring for 3/3 Runners
Transferring 9 Files... Done
Remotely executing 3/3 Runners
[8]:
True
If you’re following along on your machine you may have noticed that this call completed instantly, yet our function has a time.sleep
line in it. We would expect to have to wait 8s for this (1+1+6s delays).
This is because the dataset run defaults to be asynchronous, and as you can imagine, this can be updated by passing this as a run_arg
wherever you wish.
Waiting for Completion¶
Calculations can take time. You have two (non exclusive) options for dealing with this:
Leave the notebook for a while and rerun when you think the jobs have finished
Use
wait
Rerunning the notebook at any time will cause the inbuilt skip
methods to kick in and make sure that any running or completed jobs are not resubmitted. This means that you can submit and leave the notebook. At rerun, and any fetch_results
which failed before will grab the results this time.
Note
Rerunning the notebook works fine provided you have not specified skip=False
of force=True
anywhere.
You can also use the wait
keyword. This is a one line wrapper for a block that looks similar to this:
interval = 2
timeout = 10
t0 = time.time()
while not ds.all_finished():
time.sleep(interval)
if time.time() - t0 > timeout:
break
This periodically checks for completed runs every interval
seconds. It is also a blocking call until ds.all_finished
returns True, or more time than timeout
has passed.
[9]:
ds.wait(interval=2, timeout=10)
The call here means to check every 2 seconds, and raise a timeout error after 10 total seconds have passed.
Note
By default, wait
waits for any completion, including failures. You can restrict this to wait for a total success (timing out if there are failures) by passing success_only=True
.
Asynchronous¶
Asynchronous behaviour also means that each runner is running simultaneously, this can put excess load on machines not designed for it, or simply may not be what you want for your workflow. To avoid this, we can use asynchronous=False
Additionally here, we must use the force=True
keyword to ensure that the runs go through, as the previous runs are marked as complete
. Be careful using this keyword in your workflows with long jobs, as if they are still running and complete before your more recent run, it wil cause the results to be “injected”.
[11]:
ds.reset_runs(wipe=True, confirm=False)
t0 = time.perf_counter()
ds.run(asynchronous=False)
dt = time.perf_counter() - t0
# we expect that the synchronous run will take around 1+1+6=8s
expected_time = 8
# the test suite can take extra time here, need to leave ~2s of room
assert abs(dt - expected_time) < 2, f"run completed in {dt}s"
print(f"run completed in {dt:.2f}s")
Staging Dataset... Staged 0/3 Runners
Transferring for 3/3 Runners
Transferring 9 Files... Done
Remotely executing 3/3 Runners
run completed in 8.12s
While not particularly useful in a wide range of use cases, there may be a situation case where you want to wait for a short run to complete, and this also displays the amending of run variables nicely.
One final way you are able to set the run args is via the set_run_arg
method
[12]:
ds.set_run_arg('asynchronous', True)
print(ds.run_args["asynchronous"])
ds.set_run_arg('new_option', 'value!')
print(ds.run_args["new_option"])
True
value!
Collecting Results¶
There are functions indended to be used after a run has been called, to interact with the run, or the results.
We shall cover:
is_finished
all_finished
fetch_results
results
errors
Dataset.is_finished
¶
This property will return a boolean list of the is_finished
method of the runners. Runners are considered finished
when they have either returned a result, or failed with an error.
Dataset.all_finished
¶
This property returns the all() of Dataset.is_finished
To demonstrate these, we shall re-run and see what the state looks like at a few time intervals. But first, we must make sure that the results are not already present.
[13]:
print('wiping result files...')
# this function will clear any runner results and optionally wipe local files
ds.reset_runs(wipe=True, confirm=False)
wiping result files...
Lets add a run that will fail, to demonstrate how errors are handled
[14]:
# we can't multiply an int by None, so this should fail
ds.append_run({'a': 0, 'b': None})
appended run runner-3
[15]:
time.sleep(1) # this short sleep prevents earlier runs getting in the way
print('calcs launched, waiting before checking completion')
ds.run(asynchronous=True)
time.sleep(2)
print('\nafter 2s, state is now:')
print(ds.is_finished)
print('all_finished:', ds.all_finished)
time.sleep(5)
print('\nafter 7s, state is now:')
print(ds.is_finished)
print('all_finished:', ds.all_finished)
calcs launched, waiting before checking completion
Staging Dataset... Staged 1/4 Runners
Transferring for 4/4 Runners
Transferring 11 Files... Done
Remotely executing 4/4 Runners
after 2s, state is now:
[True, True, False, True]
all_finished: False
after 7s, state is now:
[True, True, True, True]
all_finished: True
It may seem counter-intuitive that the runs are all completed at 7s, but if we recall that they were launched asynchronously by default, the whole run would take around 6s (our maximum delay time).
The remaining functions¶
Dataset.fetch_results()
¶
This function will attempt to grab any results from files or function objects that are attached to the dataset, storing them in the results
property
Dataset.results
¶
This property allows optimised access to the results of the previous run. When results
is queried, it also checks to see if there are any errors, and warns you if any are found.
Dataset.errors
¶
Similar to results
, this stores a list of the error content if available.
[16]:
ds.fetch_results()
Fetching results
Transferring 7 Files... Done
[17]:
ds.results
Warning! Found 1 error(s), also check the `errors` property!
[17]:
[50,
47.88,
16,
RunnerFailedError('TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'')]
[18]:
ds.errors
[18]:
[None,
None,
None,
"TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'"]
Further features¶
While we touched on the runner availability earlier, we skipped over a feature which may be helpful for debugging purposes. The Runner
object has a history
property which prints a {time: state} dict that contains information about all state changes the runner has experienced.
This runner has been run and rerun a few times now, so the history will be quite full. On a fresh Dataset, a flag will be set to wipe this history.
[19]:
ds.runners[0].history
[19]:
{'2025-06-06 13:51:07/0': 'created',
'2025-06-06 13:51:07/1': 'staged',
'2025-06-06 13:51:07/2': 'transferred',
'2025-06-06 13:51:07/3': 'submit pending',
'2025-06-06 13:51:07/4': 'submitted',
'2025-06-06 13:51:07/5': 'started',
'2025-06-06 13:51:08/0': 'completed',
'2025-06-06 13:51:15/0': 'reset',
'2025-06-06 13:51:15/1': 'transferred',
'2025-06-06 13:51:15/2': 'submit pending',
'2025-06-06 13:51:23/0': 'reset',
'2025-06-06 13:51:24/0': 'transferred',
'2025-06-06 13:51:24/1': 'submit pending',
'2025-06-06 13:51:24/2': 'submitted',
'2025-06-06 13:51:24/3': 'started',
'2025-06-06 13:51:25/0': 'completed',
'2025-06-06 13:51:31/0': 'satisfied'}
here you can see the state history for the first runner in the list, showing the three runs, the creation time of the resultfile on the remote, and the final completion state where the results were loaded back into the runner
If you just require a list of states (for example, checking if a runner has passed through a state), there is the property Runner.status_list
Swapping out the serialiser¶
This is now covered in more depth in the dedicated tutorial.
Access to the commands used to execute the runs¶
Once you have run a dataset, you can access the command used to execute the bash scripts. This can be useful for debugging purposes.
[20]:
print('raw command:', ds.run_cmd.sent)
print('returned stdout:', ds.run_cmd.stdout)
print('returned stderr:', ds.run_cmd.stderr)
raw command: cd temp_ds_remote && bash tutorial_dataset-a2c088ba-master.sh
returned stdout:
returned stderr:
Running a single runner¶
While it was mentioned previously that the runners themselves should ideally not be touched, and all interaction should be done via the Dataset
, it is possible to run a single runner if necessary.
Warning
this process is inefficient and should only be used if absolutely required. It may be preferable to clear the results of the offending runner using reset_runs()
and rerunning with skip=True
[21]:
# store what the current last submission time is
last_submitted_initial = ds.runners[0].last_submitted
[22]:
ds.reset_runs(confirm=False) # clear results to demonstrate
ds.runners[0].run(asynchronous=False)
time.sleep(1)
ds.fetch_results()
Staging Dataset... Staged 0/4 Runners
Transferring for 1/4 Runners
Transferring 5 Files... Done
Remotely executing 1/4 Runners
Fetching results
Transferring 2 Files... Done
[23]:
# get the new last submission time
last_submitted_after = ds.runners[0].last_submitted
This quick assertion makes sure that the runner that was resubmitted actually has a different submission time.
[24]:
assert last_submitted_initial != last_submitted_after
We can again here demonstrate the use of check_all_runner_states
, as we have only run one, checking for full completion will return False. Obviously in this case, all_finished
will do the job, but you can query here for any state, such as submitted
.
[25]:
print(ds.check_all_runner_states('completed'))
False